#  This file is part of parallel-ssh.
#  Copyright (C) 2014-2025 Panos Kittenis.
#  Copyright (C) 2014-2025 parallel-ssh Contributors.
#
#  This library is free software; you can redistribute it and/or
#  modify it under the terms of the GNU Lesser General Public
#  License as published by the Free Software Foundation, version 2.1.
#
#  This library is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
#  Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public
#  License along with this library; if not, write to the Free Software
#  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA

import logging
import os
from collections import deque

from gevent import sleep, get_hub
from gevent.fileobject import FileObjectThread
from gevent.lock import RLock
from gevent.pool import Pool
from gevent.socket import SHUT_RDWR
from gevent.timeout import Timeout as GTimeout
from ssh2.error_codes import LIBSSH2_ERROR_EAGAIN
from ssh2.exceptions import SFTPHandleError, SFTPProtocolError, \
    Timeout as SSH2Timeout
from ssh2.session import Session, LIBSSH2_SESSION_BLOCK_INBOUND, LIBSSH2_SESSION_BLOCK_OUTBOUND, LIBSSH2_FLAG_COMPRESS
from ssh2.sftp import LIBSSH2_FXF_READ, LIBSSH2_FXF_CREAT, LIBSSH2_FXF_WRITE, \
    LIBSSH2_FXF_TRUNC, LIBSSH2_SFTP_S_IRUSR, LIBSSH2_SFTP_S_IRGRP, \
    LIBSSH2_SFTP_S_IWUSR, LIBSSH2_SFTP_S_IXUSR, LIBSSH2_SFTP_S_IROTH, \
    LIBSSH2_SFTP_S_IXGRP, LIBSSH2_SFTP_S_IXOTH

from .tunnel import FORWARDER
from ..base.single import BaseSSHClient, PollMixIn
from ...constants import DEFAULT_RETRIES, RETRY_DELAY
from ...exceptions import SessionError, SFTPError, \
    SFTPIOError, Timeout, SCPError, ProxyError
from ...output import HostOutput

logger = logging.getLogger(__name__)
THREAD_POOL = get_hub().threadpool


class KeepAlive(PollMixIn):
    """Class for handling SSHClient keepalive functionality.

    Spawns a greenlet in its own pool for sending keepalives to a given session.
    """
    __slots__ = ('sock', 'session', '_let', '_pool')

    def __init__(self, sock, session):
        """
        :param sock: The socket session is using to communicate.
        :type sock: :py:class:`gevent.socket.socket`
        :param session: The session keepalive is configured on.
        :type session: :py:class:`ssh2.session.Session`
        """
        super(PollMixIn, self).__init__()
        self._pool = Pool(1)
        self.sock = sock
        self.session = session
        self._let = self._pool.spawn(self._send_keepalive)
        self._let.start()

    def _send_keepalive(self):
        while True:
            if self.session is None or self.sock is None or self.sock.closed:
                return
            sleep_for = self.eagain(self.session.keepalive_send)
            sleep(sleep_for)

    def poll(self, timeout=None):
        """Perform co-operative gevent poll on ssh2 session socket.

        Blocks current greenlet only if socket has pending read or write operations
        in the appropriate direction.
        :param timeout: Deprecated and unused - to be removed.
        """
        self._poll_errcodes(
            self.session.block_directions,
            LIBSSH2_SESSION_BLOCK_INBOUND,
            LIBSSH2_SESSION_BLOCK_OUTBOUND,
        )

    def eagain(self, func, *args, **kwargs):
        return self._eagain_errcode(func, LIBSSH2_ERROR_EAGAIN, *args, **kwargs)


class SSHClient(BaseSSHClient):
    """ssh2-python (libssh2) based non-blocking SSH client."""
    # 2MB buffer
    _BUF_SIZE = 2048 * 1024

    def __init__(self, host,
                 user=None, password=None, port=None,
                 pkey=None, alias=None,
                 num_retries=DEFAULT_RETRIES,
                 retry_delay=RETRY_DELAY,
                 allow_agent=True, timeout=None,
                 forward_ssh_agent=False,
                 proxy_host=None,
                 proxy_port=None,
                 proxy_pkey=None,
                 proxy_user=None,
                 proxy_password=None,
                 _auth_thread_pool=True,
                 keepalive_seconds=60,
                 identity_auth=True,
                 ipv6_only=False,
                 compress=False,
                 keyboard_interactive=False,
                 ):
        """
        :param host: Host name or IP to connect to.
        :type host: str
        :param user: User to connect as. Defaults to logged in user.
        :type user: str
        :param password: Password to use for password authentication.
        :type password: str
        :param alias: Use an alias for this host.
        :type alias: str
        :param port: SSH port to connect to. Defaults to SSH default (22)
        :type port: int
        :param pkey: Private key file path to use for authentication. Path must
          be either absolute path or relative to user home directory
          like ``~/<path>``.
          Bytes type input is used as private key data for authentication.
        :type pkey: str or bytes
        :param num_retries: (Optional) Number of connection and authentication
          attempts before the client gives up. Defaults to 3.
        :type num_retries: int
        :param retry_delay: Number of seconds to wait between retries. Defaults
          to :py:class:`pssh.constants.RETRY_DELAY`
        :type retry_delay: int or float
        :param timeout: SSH session timeout setting in seconds. This controls
          timeout setting of authenticated SSH sessions for each individual SSH operation.
          Also currently sets socket as well as per function timeout in some cases, see
          function descriptions.
        :type timeout: int or float
        :param allow_agent: (Optional) set to False to disable connecting to
          the system's SSH agent
        :type allow_agent: bool
        :param identity_auth: (Optional) set to False to disable attempting to
          authenticate with default identity files from
          `pssh.clients.base.single.BaseSSHClient.IDENTITIES`
        :type identity_auth: bool
        :param forward_ssh_agent: Unused - agent forwarding not implemented.
        :type forward_ssh_agent: bool
        :param proxy_host: Connect to target host via given proxy host.
        :type proxy_host: str
        :param proxy_port: Port to use for proxy connection. Defaults to self.port
        :type proxy_port: int
        :param keepalive_seconds: Interval of keep alive messages being sent to
          server. Set to ``0`` or ``False`` to disable.
        :type keepalive_seconds: int
        :param ipv6_only: Choose IPv6 addresses only if multiple are available
          for the host or raise NoIPv6AddressFoundError otherwise. Note this will
          disable connecting to an IPv4 address if an IP address is provided instead.
        :type ipv6_only: bool
        :param compress: Enable/Disable compression on the client. Defaults to off.
        :type compress: bool
        :param keyboard_interactive: Enable/Disable keyboard interactive authentication with provided username and
          password. An `InvalidAPIUse` error is raised when keyboard_interactive is enabled without a provided password.
          Defaults to off.
        :type keyboard_interactive: bool

        :raises: :py:class:`pssh.exceptions.PKeyFileError` on errors finding
          provided private key.
        :raises: :py:class:`pssh.exceptions.InvalidAPIUseError` when `keyboard_interactive=True` with no password
          provided.
        """
        self.forward_ssh_agent = forward_ssh_agent
        self._forward_requested = False
        self.keepalive_seconds = keepalive_seconds
        self._keepalive_greenlet = None
        self._proxy_client = None
        self.alias = alias
        self.host = host
        self.port = port if port is not None else 22
        if proxy_host is not None:
            _port = port if proxy_port is None else proxy_port
            _pkey = pkey if proxy_pkey is None else proxy_pkey
            _user = user if proxy_user is None else proxy_user
            _password = password if proxy_password is None else proxy_password
            proxy_port = self._connect_proxy(
                proxy_host, _port, _pkey, user=_user, password=_password,
                num_retries=num_retries, retry_delay=retry_delay,
                allow_agent=allow_agent,
                timeout=timeout,
                keepalive_seconds=keepalive_seconds,
                identity_auth=identity_auth,
                compress=compress,
                keyboard_interactive=keyboard_interactive,
            )
            proxy_host = '127.0.0.1'
        self._chan_stdout_lock = RLock()
        self._chan_stderr_lock = RLock()
        super(SSHClient, self).__init__(
            host, user=user, password=password, alias=alias, port=port, pkey=pkey,
            num_retries=num_retries, retry_delay=retry_delay,
            allow_agent=allow_agent, _auth_thread_pool=_auth_thread_pool,
            timeout=timeout,
            proxy_host=proxy_host, proxy_port=proxy_port,
            identity_auth=identity_auth,
            ipv6_only=ipv6_only,
            compress=compress,
            keyboard_interactive=keyboard_interactive,
        )

    def _shell(self, channel):
        return self.eagain(channel.shell)

    def _connect_proxy(self, proxy_host, proxy_port, proxy_pkey,
                       user=None, password=None, alias=None,
                       num_retries=DEFAULT_RETRIES,
                       retry_delay=RETRY_DELAY,
                       allow_agent=True, timeout=None,
                       forward_ssh_agent=False,
                       keepalive_seconds=60,
                       identity_auth=True,
                       compress=False,
                       keyboard_interactive=False,
                       ):
        assert isinstance(self.port, int)
        try:
            self._proxy_client = SSHClient(
                proxy_host, port=proxy_port, pkey=proxy_pkey, alias=alias,
                num_retries=num_retries, user=user, password=password,
                retry_delay=retry_delay, allow_agent=allow_agent,
                timeout=timeout, forward_ssh_agent=forward_ssh_agent,
                identity_auth=identity_auth,
                keepalive_seconds=keepalive_seconds,
                compress=compress,
                keyboard_interactive=keyboard_interactive,
                _auth_thread_pool=False)
        except Exception as ex:
            msg = "Proxy authentication failed. " \
                  "Exception from tunnel client: %s"
            logger.error(msg, ex)
            raise ProxyError(msg, ex)
        if not FORWARDER.started.is_set():
            FORWARDER.start()
            FORWARDER.started.wait()
        FORWARDER.enqueue(self._proxy_client, self.host, self.port)
        proxy_local_port = FORWARDER.out_q.get()
        return proxy_local_port

    def _disconnect(self):
        """Attempt to disconnect session.

        Any errors on calling disconnect are suppressed by this function.

        Does not need to be called directly - called when client object is de-allocated.
        """
        self._keepalive_greenlet = None
        if self.session is not None and self.sock is not None and not self.sock.closed:
            try:
                self._disconnect_eagain()
            except Exception:
                pass
        self.session = None
        # To allow for file descriptor reuse, which is part of gevent, shutdown but do not close socket here.
        # Done by gevent when file descriptor is closed.
        if self.sock is not None and not self.sock.closed:
            try:
                self.sock.shutdown(SHUT_RDWR)
                self.sock.detach()
            except Exception:
                pass
        self.sock = None
        # Notify forwarder that proxy tunnel server can be shutdown
        if isinstance(self._proxy_client, SSHClient):
            FORWARDER.cleanup_server(self._proxy_client)

    def configure_keepalive(self):
        """Configures keepalive on the server for `self.keepalive_seconds`."""
        # Configure keepalives without a reply.
        self.session.keepalive_config(False, self.keepalive_seconds)

    def _init_session(self, retries=1):
        self.session = Session()
        if self.compress:
            self.session.flag(LIBSSH2_FLAG_COMPRESS)

        if self.timeout:
            # libssh2 timeout is in ms
            self.session.set_timeout(self.timeout * 1000)
        try:
            if self._auth_thread_pool:
                THREAD_POOL.apply(self.session.handshake, (self.sock,))
            else:
                self.session.handshake(self.sock)
        except Exception as ex:
            if retries < self.num_retries:
                sleep(self.retry_delay)
                return self._connect_init_session_retry(retries=retries+1)
            msg = "Error connecting to host %s:%s - %s"
            logger.error(msg, self.host, self.port, ex)
            if not self.sock.closed:
                try:
                    self.sock.shutdown(SHUT_RDWR)
                    self.sock.detach()
                except Exception:
                    pass
            if isinstance(ex, SSH2Timeout):
                raise Timeout(msg, self.host, self.port, ex)
            raise

    def _keepalive(self):
        if self.keepalive_seconds:
            self.configure_keepalive()
            self._keepalive_greenlet = KeepAlive(self.sock, self.session)

    def _agent_auth(self):
        self.session.agent_auth(self.user)

    def _pkey_from_memory(self, pkey_data):
        self.session.userauth_publickey_frommemory(
            self.user,
            pkey_data,
            passphrase=self.password if self.password is not None else b'',
        )

    def _password_auth(self):
        if self.keyboard_interactive:
            return self.session.userauth_keyboardinteractive(self.user, self.password)
        return self.session.userauth_password(self.user, self.password)

    def _open_session(self):
        chan = self.eagain(self.session.open_session)
        return chan

    def open_session(self):
        """Open new channel from session.

        :rtype: :py:class:`ssh2.channel.Channel`
        """
        try:
            chan = self._open_session()
        except Exception as ex:
            raise SessionError(ex)
        # if self.forward_ssh_agent and not self._forward_requested:
            # self._eagain(chan.request_auth_agent)
            # self._forward_requested = True
        return chan

    def _make_output_readers(self, channel, stdout_buffer, stderr_buffer):
        # TODO: These greenlets need to be outside client scope or we create a reader <-> client cyclical reference
        _stdout_reader = self._pool.spawn(
            self._read_output_to_buffer, channel.read, stdout_buffer)
        _stderr_reader = self._pool.spawn(
            self._read_output_to_buffer, channel.read_stderr, stderr_buffer)
        return _stdout_reader, _stderr_reader

    def _execute(self, cmd, use_pty=False, channel=None):
        """
        Use ``run_command`` which returns a ``HostOutput`` object rather than this function directly.

        Execute command on remote server.

        :param cmd: Command to execute.
        :type cmd: str
        :param use_pty: Whether or not to obtain a PTY on the channel.
        :type use_pty: bool
        :param channel: Use provided channel for execute rather than creating
          a new one.
        :type channel: :py:class:`ssh2.channel.Channel`

        :rtype: :py:class:`ssh2.channel.Channel`
        """
        channel = self.open_session() if channel is None else channel
        if use_pty:
            self.eagain(channel.pty)
        logger.debug("Executing command '%s'", cmd)
        self.eagain(channel.execute, cmd)
        return channel

    def _read_output_to_buffer(self, read_func, _buffer, is_stderr=False):
        _lock = self._chan_stderr_lock if is_stderr else self._chan_stdout_lock
        try:
            while True:
                with _lock:
                    size, data = read_func()
                if size == LIBSSH2_ERROR_EAGAIN:
                    self.poll()
                    continue
                if size <= 0:
                    break
                _buffer.write(data)
                sleep()
        finally:
            _buffer.eof.set()

    def wait_finished(self, host_output, timeout=None):
        """Wait for EOF from channel and close channel.

        Used to wait for remote command completion and be able to gather
        exit code.

        :param host_output: Host output of command to wait for.
        :type host_output: :py:class:`pssh.output.HostOutput`
        :param timeout: Timeout value in seconds - defaults to no timeout.
        :type timeout: float

        :raises: :py:class:`pssh.exceptions.Timeout` after <timeout> seconds if
          timeout given.
        """
        if not isinstance(host_output, HostOutput):
            raise ValueError("%s is not a HostOutput object" % (host_output,))
        channel = host_output.channel
        if channel is None:
            return
        with GTimeout(seconds=timeout, exception=Timeout):
            self.eagain(channel.wait_eof)
            # Close channel to indicate no more commands will be sent over it
            self.close_channel(channel)

    def close_channel(self, channel):
        """Close given channel, handling EAGAIN."""
        with self._chan_stdout_lock, self._chan_stderr_lock:
            logger.debug("Closing channel")
            self.eagain(channel.close)

    def eagain(self, func, *args, **kwargs):
        """Handle EAGAIN and call given function with any args, polling for as long as there is data to receive."""
        return self._eagain_errcode(func, LIBSSH2_ERROR_EAGAIN, *args, **kwargs)

    def _make_sftp_eagain(self):
        return self.eagain(self.session.sftp_init)

    def _make_sftp(self):
        try:
            sftp = self._make_sftp_eagain()
        except Exception as ex:
            raise SFTPError(ex)
        return sftp

    def _mkdir(self, sftp, directory):
        """Make directory via SFTP channel.

        :param sftp: SFTP client object
        :type sftp: :py:class:`ssh2.sftp.SFTP`
        :param directory: Remote directory to create
        :type directory: str

        :raises: :py:class:`pssh.exceptions.SFTPIOError` on SFTP IO errors
        """
        mode = LIBSSH2_SFTP_S_IRUSR | \
            LIBSSH2_SFTP_S_IWUSR | \
            LIBSSH2_SFTP_S_IXUSR | \
            LIBSSH2_SFTP_S_IRGRP | \
            LIBSSH2_SFTP_S_IROTH | \
            LIBSSH2_SFTP_S_IXGRP | \
            LIBSSH2_SFTP_S_IXOTH
        try:
            self.eagain(sftp.mkdir, directory, mode)
        except SFTPProtocolError as error:
            msg = "Error occured creating directory %s on host %s - %s"
            logger.error(msg, directory, self.host, error)
            raise SFTPIOError(msg, directory, self.host, error)
        logger.debug("Created remote directory %s", directory)

    def copy_file(self, local_file, remote_file, recurse=False, sftp=None):
        """Copy local file to host via SFTP.

        :param local_file: Local filepath to copy to remote host
        :type local_file: str
        :param remote_file: Remote filepath on remote host to copy file to
        :type remote_file: str
        :param recurse: Whether or not to descend into directories recursively.
        :type recurse: bool
        :param sftp: SFTP channel to use instead of creating a
          new one.
        :type sftp: :py:class:`ssh2.sftp.SFTP`

        :raises: :py:class:`ValueError` when a directory is supplied to
          ``local_file`` and ``recurse`` is not set
        :raises: :py:class:`pssh.exceptions.SFTPError` on SFTP initialisation
          errors
        :raises: :py:class:`pssh.exceptions.SFTPIOError` on I/O errors writing
          via SFTP
        :raises: :py:class:`IOError` on local file IO errors
        :raises: :py:class:`OSError` on local OS errors like permission denied
        """
        sftp = self._make_sftp() if sftp is None else sftp
        if os.path.isdir(local_file) and recurse:
            return self._copy_dir(local_file, remote_file, sftp)
        elif os.path.isdir(local_file) and not recurse:
            raise ValueError("Recurse must be true if local_file is a "
                             "directory.")
        destination = self._remote_paths_split(remote_file)
        if destination is not None:
            try:
                self.eagain(sftp.stat, destination)
            except (SFTPHandleError, SFTPProtocolError):
                self.mkdir(sftp, destination)
        self.sftp_put(sftp, local_file, remote_file)
        logger.info("Copied local file %s to remote destination %s:%s",
                    local_file, self.host, remote_file)

    def _sftp_put(self, remote_fh, local_file):
        with FileObjectThread(local_file, 'rb', self._BUF_SIZE, threadpool=THREAD_POOL) as local_fh:
            data = local_fh.read(self._BUF_SIZE)
            while data:
                self.eagain_write(remote_fh.write, data)
                data = local_fh.read(self._BUF_SIZE)

    def sftp_put(self, sftp, local_file, remote_file):
        """Perform an SFTP put - copy local file path to remote via SFTP.

        :param sftp: SFTP client object.
        :type sftp: :py:class:`ssh2.sftp.SFTP`
        :param local_file: Local filepath to copy to remote host.
        :type local_file: str
        :param remote_file: Remote filepath on remote host to copy file to.
        :type remote_file: str

        :raises: :py:class:`pssh.exceptions.SFTPIOError` on I/O errors writing
          via SFTP.
        """
        mode = LIBSSH2_SFTP_S_IRUSR | \
            LIBSSH2_SFTP_S_IWUSR | \
            LIBSSH2_SFTP_S_IRGRP | \
            LIBSSH2_SFTP_S_IROTH
        f_flags = LIBSSH2_FXF_CREAT | LIBSSH2_FXF_WRITE | LIBSSH2_FXF_TRUNC
        with self._sftp_openfh(
                sftp.open, remote_file, f_flags, mode) as remote_fh:
            try:
                self._sftp_put(remote_fh, local_file)
            except SFTPProtocolError as ex:
                msg = "Error writing to remote file %s - %s"
                logger.error(msg, remote_file, ex)
                raise SFTPIOError(msg, remote_file, ex)

    def mkdir(self, sftp, directory):
        """Make directory via SFTP channel.

        Parent paths in the directory are created if they do not exist.

        :param sftp: SFTP client object
        :type sftp: :py:class:`ssh2.sftp.SFTP`
        :param directory: Remote directory to create
        :type directory: str

        Catches and logs at error level remote IOErrors on creating directory.
        """
        _paths_to_create = deque()
        for d in directory.split('/'):
            if not d:
                continue
            _paths_to_create.append(d)
        cwd = '' if directory.startswith('/') else '.'
        while _paths_to_create:
            cur_dir = _paths_to_create.popleft()
            cwd = '/'.join([cwd, cur_dir])
            try:
                self.eagain(sftp.stat, cwd)
            except (SFTPHandleError, SFTPProtocolError) as ex:
                logger.debug("Stat for %s failed with %s", cwd, ex)
                self._mkdir(sftp, cwd)

    def copy_remote_file(self, remote_file, local_file, recurse=False,
                         sftp=None, encoding='utf-8'):
        """Copy remote file to local host via SFTP.

        :param remote_file: Remote filepath to copy from
        :type remote_file: str
        :param local_file: Local filepath where file(s) will be copied to
        :type local_file: str
        :param recurse: Whether or not to recursively copy directories
        :type recurse: bool
        :param encoding: Encoding to use for file paths.
        :type encoding: str
        :param sftp: SFTP channel to use instead of creating a
          new one.
        :type sftp: :py:class:`ssh2.sftp.SFTP`

        :raises: :py:class:`ValueError` when a directory is supplied to
          ``local_file`` and ``recurse`` is not set
        :raises: :py:class:`pssh.exceptions.SFTPError` on SFTP initialisation
          errors
        :raises: :py:class:`pssh.exceptions.SFTPIOError` on I/O errors reading
          from SFTP
        :raises: :py:class:`IOError` on local file IO errors
        :raises: :py:class:`OSError` on local OS errors like permission denied
        """
        sftp = self._make_sftp() if sftp is None else sftp
        try:
            self.eagain(sftp.stat, remote_file)
        except (SFTPHandleError, SFTPProtocolError):
            msg = "Remote file or directory %s on host %s does not exist"
            logger.error(msg, remote_file, self.host)
            raise SFTPIOError(msg, remote_file, self.host)
        try:
            dir_h = self._sftp_openfh(sftp.opendir, remote_file)
        except SFTPError:
            pass
        else:
            if not recurse:
                raise ValueError("Recurse must be true if remote_file is a "
                                 "directory.")
            file_list = self._sftp_readdir(dir_h)
            return self._copy_remote_dir(file_list, remote_file,
                                         local_file, sftp,
                                         encoding=encoding)
        destination = os.path.join(os.path.sep, os.path.sep.join(
            [_dir for _dir in local_file.split('/')
             if _dir][:-1]))
        self._make_local_dir(destination)
        self.sftp_get(sftp, remote_file, local_file)
        logger.info("Copied local file %s from remote destination %s:%s",
                    local_file, self.host, remote_file)

    def _scp_recv_recursive(self, remote_file, local_file, sftp, encoding='utf-8'):
        try:
            self.eagain(sftp.stat, remote_file)
        except (SFTPHandleError, SFTPProtocolError):
            msg = "Remote file or directory %s does not exist"
            logger.error(msg, remote_file)
            raise SCPError(msg, remote_file)
        try:
            dir_h = self._sftp_openfh(sftp.opendir, remote_file)
        except SFTPError:
            # remote_file is not a dir, scp file
            return self.scp_recv(remote_file, local_file, encoding=encoding)
        try:
            os.makedirs(local_file)
        except OSError:
            pass
        file_list = self._sftp_readdir(dir_h)
        return self._scp_recv_dir(file_list, remote_file,
                                  local_file, sftp,
                                  encoding=encoding)

    def scp_recv(self, remote_file, local_file, recurse=False, sftp=None,
                 encoding='utf-8'):
        """Copy remote file to local host via SCP.

        Note - Remote directory listings are gathered via SFTP when
        ``recurse`` is enabled - SCP lacks directory list support.
        Enabling recursion therefore involves creating an extra SFTP channel
        and requires SFTP support on the server.

        :param remote_file: Remote filepath to copy from
        :type remote_file: str
        :param local_file: Local filepath where file(s) will be copied to
        :type local_file: str
        :param recurse: Whether or not to recursively copy directories
        :type recurse: bool
        :param sftp: The SFTP channel to use instead of creating a new one.
          Only used when ``recurse`` is ``True``.
        :type sftp: :py:class:`ssh2.sftp.SFTP`
        :param encoding: Encoding to use for file paths when recursion is
          enabled.
        :type encoding: str

        :raises: :py:class:`pssh.exceptions.SCPError` on errors copying file.
        :raises: :py:class:`IOError` on local file IO errors.
        :raises: :py:class:`OSError` on local OS errors like permission denied.
        """
        if recurse:
            sftp = self._make_sftp() if sftp is None else sftp
            return self._scp_recv_recursive(remote_file, local_file, sftp, encoding=encoding)
        elif local_file.endswith('/'):
            remote_filename = remote_file.rsplit('/')[-1]
            local_file += remote_filename
        destination = os.path.join(os.path.sep, os.path.sep.join(
            [_dir for _dir in local_file.split('/')
             if _dir][:-1]))
        self._make_local_dir(destination)
        self._scp_recv(remote_file, local_file)
        logger.info("SCP local file %s from remote destination %s:%s",
                    local_file, self.host, remote_file)

    def _scp_recv(self, remote_file, local_file):
        try:
            (file_chan, fileinfo) = self.eagain(
                self.session.scp_recv2, remote_file)
        except Exception as ex:
            msg = "Error copying file %s from host %s - %s"
            logger.error(msg, remote_file, self.host, ex)
            raise SCPError(msg, remote_file, self.host, ex)
        local_fh = FileObjectThread(local_file, 'wb', threadpool=THREAD_POOL)
        try:
            total = 0
            while total < fileinfo.st_size:
                size, data = file_chan.read(size=fileinfo.st_size - total)
                if size == LIBSSH2_ERROR_EAGAIN:
                    self.poll()
                    continue
                total += size
                local_fh.write(data)
        finally:
            local_fh.flush()
            local_fh.close()
            file_chan.close()

    def scp_send(self, local_file, remote_file, recurse=False, sftp=None):
        """Copy local file to host via SCP.

        Note - Directories are created via SFTP when ``recurse`` is enabled -
        SCP lacks directory create support. Enabling recursion therefore
        involves creating an extra SFTP channel and requires SFTP support on the
        server.

        :param local_file: Local filepath to copy to remote host
        :type local_file: str
        :param remote_file: Remote filepath on remote host to copy file to
        :type remote_file: str
        :param sftp: The SFTP channel to use instead of creating a new one.
          Only used when ``recurse`` is ``True``.
        :type sftp: :py:class:`ssh2.sftp.SFTP`
        :param recurse: Whether or not to descend into directories recursively.
        :type recurse: bool

        :raises: :py:class:`ValueError` when a directory is supplied to
          ``local_file`` and ``recurse`` is not set
        :raises: :py:class:`pssh.exceptions.SFTPError` on SFTP initialisation
          errors
        :raises: :py:class:`pssh.exceptions.SFTPIOError` on I/O errors writing
          via SFTP
        :raises: :py:class:`IOError` on local file IO errors
        :raises: :py:class:`OSError` on local OS errors like permission denied
        """
        if os.path.isdir(local_file) and recurse:
            sftp = self._make_sftp() if sftp is None else sftp
            return self._scp_send_dir(local_file, remote_file, sftp)
        elif os.path.isdir(local_file) and not recurse:
            raise ValueError("Recurse must be True if local_file is a "
                             "directory.")
        if recurse:
            destination = self._remote_paths_split(remote_file)
            if destination is not None:
                sftp = self._make_sftp() if sftp is None else sftp
                try:
                    self.eagain(sftp.stat, destination)
                except (SFTPHandleError, SFTPProtocolError):
                    self.mkdir(sftp, destination)
        elif remote_file.endswith('/'):
            local_filename = local_file.rsplit('/')[-1]
            remote_file += local_filename
        logger.info("SCP local file %s to remote destination %s:%s",
                    local_file, self.host, remote_file)
        self._scp_send(local_file, remote_file)

    def _scp_send(self, local_file, remote_file):
        fileinfo = os.stat(local_file)
        try:
            chan = self.eagain(
                self.session.scp_send64,
                remote_file, fileinfo.st_mode & 0o777, fileinfo.st_size,
                fileinfo.st_mtime, fileinfo.st_atime)
        except Exception as ex:
            msg = "Error opening remote file %s for writing on host %s - %s"
            logger.error(msg, remote_file, self.host, ex)
            raise SCPError(msg, remote_file, self.host, ex)
        try:
            with FileObjectThread(local_file, 'rb', 2097152, threadpool=THREAD_POOL) as local_fh:
                data = local_fh.read(self._BUF_SIZE)
                while data:
                    self.eagain_write(chan.write, data)
                    data = local_fh.read(self._BUF_SIZE)
        except Exception as ex:
            msg = "Error writing to remote file %s on host %s - %s"
            logger.error(msg, remote_file, self.host, ex)
            raise SCPError(msg, remote_file, self.host, ex)
        finally:
            self.eagain(chan.flush)
            self.eagain(chan.send_eof)
            self.eagain(chan.wait_eof)
            self.eagain(chan.wait_closed)

    def _sftp_openfh(self, open_func, remote_file, *args):
        try:
            fh = self.eagain(open_func, remote_file, *args)
        except Exception as ex:
            raise SFTPError(ex)
        return fh

    def _sftp_get(self, remote_fh, local_file):
        with FileObjectThread(local_file, 'wb', threadpool=THREAD_POOL) as local_fh:
            for size, data in remote_fh:
                if size == LIBSSH2_ERROR_EAGAIN:
                    self.poll()
                    continue
                local_fh.write(data)

    def sftp_get(self, sftp, remote_file, local_file):
        with self._sftp_openfh(
                sftp.open, remote_file,
                LIBSSH2_FXF_READ, LIBSSH2_SFTP_S_IRUSR) as remote_fh:
            try:
                self._sftp_get(remote_fh, local_file)
            except SFTPProtocolError as ex:
                msg = "Error reading from remote file %s - %s"
                logger.error(msg, remote_file, ex)
                raise SFTPIOError(msg, remote_file, ex)

    def get_exit_status(self, channel):
        """Get exit status code for channel or ``None`` if not ready.

        :param channel: The channel to get status from.
        :type channel: :py:mod:`ssh2.channel.Channel`
        :rtype: int or ``None``
        """
        if not channel.eof():
            return
        return channel.get_exit_status()

    def finished(self, channel):
        """Checks if remote command has finished - has server sent client
        EOF.

        :rtype: bool
        """
        if channel is None:
            return
        return channel.eof()

    def poll(self, timeout=None):
        """Perform co-operative gevent poll on ssh2 session socket.

        Blocks current greenlet only if socket has pending read or write operations
        in the appropriate direction.
        :param timeout: Deprecated and unused - to be removed.
        """
        self._poll_errcodes(
            self.session.block_directions,
            LIBSSH2_SESSION_BLOCK_INBOUND,
            LIBSSH2_SESSION_BLOCK_OUTBOUND,
        )

    def eagain_write(self, write_func, data):
        """Write data with given write_func for an ssh2-python session while
        handling EAGAIN and resuming writes from last written byte on each call to
        write_func.
        """
        return self._eagain_write_errcode(write_func, data, LIBSSH2_ERROR_EAGAIN)
